import torch
import numpy as np
from environments import SUPPORTED_ENVIRONMENTS, make_environment
from networks import ActorCritic, IntrinsicCuriosityModule
from utils import Recorder, Memory, load_checkpoint
print(SUPPORTED_ENVIRONMENTS)
recorder_1 = Recorder()
recorder_2 = Recorder()
pong = make_environment('Pong')
breakout = make_environment('Breakout')
mario_level_1 = make_environment('SuperMarioBros level 1')
mario_level_2 = make_environment('SuperMarioBros level 2')
class RandomAgent:
def play(self, environment, max_games=1, max_steps=500, recorder=None):
# Reset environment
observation = environment.reset()
# Initialize infos and recorder
n_games, n_steps = 0, 0
current_game_infos = {'game': 1, 'reward': 0, 'game_duration': 0}
if recorder is not None:
recorder.reset()
recorder.record(environment)
# Main loop
while (n_steps < max_steps) and (n_games < max_games):
# Interact with environment
action = environment.action_space.sample()
observation, extrinsic_reward, is_game_over, infos = environment.step(action)
# Update infos and recorder
n_steps += 1
current_game_infos['reward'] += extrinsic_reward
current_game_infos['game_duration'] += 1
if recorder is not None:
recorder.record(environment)
if is_game_over:
# Update infos
n_games += 1
print(current_game_infos)
current_game_infos = {'game': n_games + 1, 'reward': 0, 'game_duration': 0}
# Reset environment
observation = environment.reset()
# Stop recorder
if recorder is not None:
recorder.stop()
random_agent = RandomAgent()
random_agent.play(pong, max_games=1, max_steps=500, recorder=recorder_1)
recorder_1.replay()
class ActorCriticAgent:
def __init__(self, num_actions, checkpoint=None):
# Initialize network, optimizer and memory
self.network, self.trainable_parameters = self.init_network(num_actions)
self.optimizer = torch.optim.Adam(self.trainable_parameters, lr=1e-4)
self.memory = Memory()
# Load pretrained model
if checkpoint is not None:
load_checkpoint(self.network, self.optimizer, checkpoint)
def init_network(self, num_actions):
# Initialize Actor-Critic
network = {'actor_critic': ActorCritic(num_actions)}
trainable_parameters = list(network['actor_critic'].parameters())
return network, trainable_parameters
def play(self, environment, max_games=1, max_steps=500, train=False, verbose=False, recorder=None):
# Reset environment
observation = environment.reset()
# Initialize infos and recorder
n_steps = 0
n_games = 0
current_game_infos = {'game': n_games + 1, 'reward': 0, 'game_duration': 0}
if recorder is not None:
recorder.reset()
recorder.record(environment)
# Main loop
while (n_steps < max_steps) and (n_games < max_games):
# Reset memory
self.init_rollout(observation)
for rollout_step in range(20):
# Interact with environment
value, log_policy, action = self.network['actor_critic'](observation)
self.memory.append({'value': value, 'log_policy': log_policy, 'action': action})
observation, extrinsic_reward, is_game_over, infos = environment.step(action.numpy()[0])
reward = self.get_reward(observation, extrinsic_reward)
self.memory.append({'reward': reward})
# Update infos and recorder
n_steps += 1
current_game_infos['reward'] += extrinsic_reward
current_game_infos['game_duration'] += 1
if recorder is not None:
recorder.record(environment)
if is_game_over:
# Update infos
n_games += 1
print(current_game_infos)
current_game_infos = {'game': n_games + 1, 'reward': 0, 'game_duration': 0}
# Reset environment
observation = environment.reset()
# Interrupt rollout
break
self.end_rollout(observation, is_game_over)
if verbose:
print(current_game_infos)
if train:
# Update neural network
loss = self.compute_loss()
self.backpropagate(loss)
if recorder is not None: recorder.stop()
def init_rollout(self, observation):
self.memory.reset()
self.network['actor_critic'].detach_internal_state()
def end_rollout(self, observation, is_game_over):
if is_game_over:
next_value = torch.Tensor([[0]])
self.network['actor_critic'].reset_internal_state()
else:
next_value = self.network['actor_critic'](observation)[0].detach()
self.memory.append({'value': next_value})
def get_reward(self, observation, extrinsic_reward):
return np.clip(extrinsic_reward, -1, 1)
def compute_loss(self):
return self.network['actor_critic'].loss(self.memory)
def backpropagate(self, loss, max_gradient_norm=40):
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.trainable_parameters, max_gradient_norm)
self.optimizer.step()
smart_mario_agent = ActorCriticAgent(num_actions=mario_level_1.action_space.n)
smart_mario_agent.play(mario_level_1, max_games=3, max_steps=500, verbose=True, recorder=recorder_1)
recorder_1.replay()
smart_mario_agent.play(mario_level_1, max_games=10, max_steps=10000, train=True)
smart_mario_agent.play(mario_level_1, max_games=3, max_steps=500, recorder=recorder_1)
recorder_1.replay()
smart_mario_agent = ActorCriticAgent(num_actions=mario_level_1.action_space.n,
checkpoint='models/smart_mario_agent_4M.tar')
smart_mario_agent.play(mario_level_1, max_games=3, max_steps=500, recorder=recorder_1)
recorder_1.replay()
class CuriousActorCriticAgent(ActorCriticAgent):
def init_network(self, num_actions):
network, trainable_parameters = super().init_network(num_actions)
# Initialize Intrinsic Curiosity Module
network['icm'] = IntrinsicCuriosityModule(num_actions)
trainable_parameters += list(network['icm'].parameters())
return network, trainable_parameters
def init_rollout(self, observation):
super().init_rollout(observation)
# Encode the observation into features
features = self.network['icm'].observation_encoder(observation)
self.memory.append({'features': features})
def end_rollout(self, observation, is_game_over):
# Ignore information about the end of the game
next_value = self.network['actor_critic'](observation)[0].detach()
self.memory.append({'value': next_value})
def get_reward(self, observation, extrinsic_reward):
# Retrieve features and action from the previous step
last_features = self.memory.get_last('features')
last_action = self.memory.get_last('action')
# Encode the observation into features
features = self.network['icm'].observation_encoder(observation)
# Try to find by yourself the inputs and outputs of these neural networks:
predicted_features = self.network['icm'].forward_model(last_features, last_action)
predicted_action = self.network['icm'].inverse_model(last_features, features)
self.memory.append({'features': features,
'predicted_features': predicted_features,
'predicted_action': predicted_action})
# Try to find by yourself the inputs of the curiosity function:
intrinsic_reward = self.network['icm'].curiosity(predicted_features, features)
return np.clip(intrinsic_reward, -1, 1)
def compute_loss(self):
loss = super().compute_loss()
# Add the ICM loss
loss += self.network['icm'].loss(self.memory)
return loss
curious_mario_agent = CuriousActorCriticAgent(num_actions=mario_level_1.action_space.n,
checkpoint='models/curious_mario_agent_4M.tar')
curious_mario_agent.play(mario_level_1, max_games=5, max_steps=1000, recorder=recorder_1)
recorder_1.replay()
smart_mario_agent = ActorCriticAgent(num_actions=mario_level_1.action_space.n,
checkpoint='models/smart_mario_agent_4M.tar')
smart_mario_agent.play(mario_level_2, max_games=10, max_steps=5000, recorder=recorder_1, train=True)
curious_mario_agent = CuriousActorCriticAgent(num_actions=mario_level_1.action_space.n,
checkpoint='models/curious_mario_agent_4M.tar')
curious_mario_agent.play(mario_level_2, max_games=10, max_steps=5000, recorder=recorder_2, train=True)
recorder_1.replay()
recorder_2.replay()